package org.apache.hadoop.hdfs;

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import javax.security.auth.login.LoginException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;

/**
 * Main class for a series of name-node benchmarks.
 *
 * Each benchmark measures throughput and average execution time of a specific
 * name-node operation, e.g. file creation or block reports.
 *
 * The benchmark does not involve any other hadoop components except for the
 * name-node. Each operation is executed by calling directly the respective
 * name-node method. The name-node here is real all other components are
 * simulated.
 *
 * Command line arguments for the benchmark include:<br>
 * 1) total number of operations to be performed,<br>
 * 2) number of threads to run these operations,<br>
 * 3) followed by operation specific input parameters.
 *
 * Then the benchmark generates inputs for each thread so that the input
 * generation overhead does not effect the resulting statistics. The number of
 * operations performed by threads practically is the same. Precisely, the
 * difference between the number of operations performed by any two threads does
 * not exceed 1.
 *
 * Then the benchmark executes the specified number of operations using the
 * specified number of threads and outputs the resulting stats.
 */
public class NNThroughputBenchmark {
	private static final Log LOG = LogFactory
			.getLog(NNThroughputBenchmark.class);
	private static final int BLOCK_SIZE = 16;

	static Configuration config;
	static NameNode nameNode;

	private static PrintStream res = null;
	private final UserGroupInformation ugi;

	NNThroughputBenchmark(Configuration conf) throws IOException,
			LoginException {
		config = conf;
		ugi = UnixUserGroupInformation.login(config);
		UserGroupInformation.setCurrentUser(ugi);

		// We do not need many handlers, since each thread simulates a handler
		// by calling name-node methods directly
		config.setInt("dfs.namenode.handler.count", 1);
		// set exclude file
		config.set("dfs.hosts.exclude", "${hadoop.tmp.dir}/dfs/hosts/exclude");
		File excludeFile = new File(config.get("dfs.hosts.exclude", "exclude"));
		if (!excludeFile.exists()) {
			if (!excludeFile.getParentFile().mkdirs())
				throw new IOException("NNThroughputBenchmark: cannot mkdir "
						+ excludeFile);
		}
		new FileOutputStream(excludeFile).close();
		// Start the NameNode
		String[] argv = new String[] {};
		nameNode = NameNode.createNameNode(argv, config);

		// creat the result file
		res = new PrintStream(new FileOutputStream("/tmp/nnbenchmark.out"));
	}

	void close() throws IOException {
		nameNode.stop();

	}

	static void turnOffNameNodeLogging() {
		// change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
		((Log4JLogger) NameNode.LOG).getLogger().setLevel(Level.ERROR);
		((Log4JLogger) NameNode.stateChangeLog).getLogger().setLevel(
				Level.ERROR);
		((Log4JLogger) NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
		((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
		((Log4JLogger) FSNamesystem.auditLog).getLogger().setLevel(Level.ERROR);
		((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ERROR);
	}

	/**
	 * Base class for collecting operation statistics.
	 *
	 * Overload this class in order to run statistics for a specific name-node
	 * operation.
	 */
	abstract class OperationStatsBase {
		protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
		protected static final String OP_ALL_NAME = "all";
		protected static final String OP_ALL_USAGE = "-op all "
				+ "<other ops options> [-keepResults]";

		protected String baseDir;
		protected short replication;
		protected int numThreads = 0; // number of threads
		protected int numOpsRequired = 0; // number of operations requested
		protected int numOpsExecuted = 0; // number of operations executed
		protected long cumulativeTime = 0; // sum of times for each op
		protected long elapsedTime = 0; // time from start to finish
		protected boolean keepResults = false;// don't clean base directory on
												// exit

		protected List<StatsDaemon> daemons;

		/**
		 * Operation name.
		 */
		abstract String getOpName();

		/**
		 * Parse command line arguments.
		 *
		 * @param args
		 *            arguments
		 * @throws IOException
		 */
		abstract void parseArguments(List<String> args) throws IOException;

		/**
		 * Generate inputs for each daemon thread.
		 *
		 * @param opsPerThread
		 *            number of inputs for each thread.
		 * @throws IOException
		 */
		abstract void generateInputs(int[] opsPerThread) throws IOException;

		/**
		 * This corresponds to the arg1 argument of
		 * {@link #executeOp(int, int, String)}, which can have different
		 * meanings depending on the operation performed.
		 *
		 * @param daemonId
		 * @return the argument
		 */
		abstract String getExecutionArgument(int daemonId);

		/**
		 * Execute name-node operation.
		 *
		 * @param daemonId
		 *            id of the daemon calling this method.
		 * @param inputIdx
		 *            serial index of the operation called by the deamon.
		 * @param arg1
		 *            operation specific argument.
		 * @return time of the individual name-node call.
		 * @throws IOException
		 */
		abstract long executeOp(int daemonId, int inputIdx, String arg1)
				throws IOException;

		/**
		 * Print the results of the benchmarking.
		 */
		abstract void printResults();

		OperationStatsBase() {
			baseDir = BASE_DIR_NAME + "/" + getOpName();
			replication = (short) config.getInt("dfs.replication", 3);
			numOpsRequired = 10;
			numThreads = 3;
		}

		void benchmark() throws IOException {
			daemons = new ArrayList<StatsDaemon>();
			long start = 0;
			try {
				numOpsExecuted = 0;
				cumulativeTime = 0;
				if (numThreads < 1)
					return;
				int tIdx = 0; // thread index < nrThreads
				int opsPerThread[] = new int[numThreads];
				for (int opsScheduled = 0; opsScheduled < numOpsRequired; opsScheduled += opsPerThread[tIdx++]) {
					// execute in a separate thread
					opsPerThread[tIdx] = (numOpsRequired - opsScheduled)
							/ (numThreads - tIdx);
					if (opsPerThread[tIdx] == 0)
						opsPerThread[tIdx] = 1;
				}
				// if numThreads > numOpsRequired then the remaining threads
				// will do nothing
				for (; tIdx < numThreads; tIdx++)
					opsPerThread[tIdx] = 0;
				turnOffNameNodeLogging();
				generateInputs(opsPerThread);
				for (tIdx = 0; tIdx < numThreads; tIdx++)
					daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
				start = System.currentTimeMillis();
				LOG.info("Starting " + numOpsRequired + " " + getOpName()
						+ "(s).");
				for (StatsDaemon d : daemons)
					d.start();
			} finally {
				while (isInPorgress()) {
					// try {Thread.sleep(500);} catch (InterruptedException e)
					// {}
				}
				elapsedTime = System.currentTimeMillis() - start;
				for (StatsDaemon d : daemons) {
					incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
					// System.out.println(d.toString() + ": ops Exec = " +
					// d.localNumOpsExecuted);
				}
			}
		}

		private boolean isInPorgress() {
			for (StatsDaemon d : daemons)
				if (d.isInProgress())
					return true;
			return false;
		}

		void cleanUp() throws IOException {
			nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
			if (!keepResults)
				nameNode.delete(getBaseDir(), true);
		}

		int getNumOpsExecuted() {
			return numOpsExecuted;
		}

		long getCumulativeTime() {
			return cumulativeTime;
		}

		long getElapsedTime() {
			return elapsedTime;
		}

		long getAverageTime() {
			return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
		}

		double getOpsPerSecond() {
			return elapsedTime == 0 ? 0 : 1000 * (double) numOpsExecuted
					/ elapsedTime;
		}

		String getBaseDir() {
			return baseDir;
		}

		String getClientName(int idx) {
			return getOpName() + "-client-" + idx;
		}

		void incrementStats(int ops, long time) {
			numOpsExecuted += ops;
			cumulativeTime += time;
		}

		/**
		 * Parse first 2 arguments, corresponding to the "-op" option.
		 *
		 * @param args
		 * @return true if operation is all, which means that options not
		 *         related to this operation should be ignored, or false
		 *         otherwise, meaning that usage should be printed when an
		 *         unrelated option is encountered.
		 * @throws IOException
		 */
		protected boolean verifyOpArgument(List<String> args) {
			if (args.size() < 2 || !args.get(0).startsWith("-op"))
				printUsage();
			int krIndex = args.indexOf("-keepResults");
			keepResults = (krIndex >= 0);
			if (keepResults) {
				args.remove(krIndex);
			}
			String type = args.get(1);
			if (OP_ALL_NAME.equals(type)) {
				type = getOpName();
				return true;
			}
			if (!getOpName().equals(type))
				printUsage();
			return false;
		}

		void printStats() {
			LOG.info("--- " + getOpName() + " stats  ---");
			LOG.info("# operations: " + getNumOpsExecuted());
			LOG.info("Elapsed Time: " + getElapsedTime());
			LOG.info(" Ops per sec: " + getOpsPerSecond());
			LOG.info("Average Time: " + getAverageTime());

			// make the output for result
			res.println("--- " + getOpName() + " stats  ---");
			res.println("# operations: " + getNumOpsExecuted());
			res.println("Elapsed Time: " + getElapsedTime());
			res.println(" Ops per sec: " + getOpsPerSecond());
			res.println("Average Time: " + getAverageTime());

		}
	}

	/**
	 * One of the threads that perform stats operations.
	 */
	private class StatsDaemon extends Thread {
		private int daemonId;
		private int opsPerThread;
		private String arg1; // argument passed to executeOp()
		private volatile int localNumOpsExecuted = 0;
		private volatile long localCumulativeTime = 0;
		private OperationStatsBase statsOp;

		StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
			this.daemonId = daemonId;
			this.opsPerThread = nrOps;
			this.statsOp = op;
			setName(toString());
		}

		public void run() {
			UserGroupInformation.setCurrentUser(ugi);
			localNumOpsExecuted = 0;
			localCumulativeTime = 0;
			arg1 = statsOp.getExecutionArgument(daemonId);
			try {
				benchmarkOne();
			} catch (IOException ex) {
				LOG.error("StatsDaemon " + daemonId + " failed: \n"
						+ StringUtils.stringifyException(ex));
			}
		}

		public String toString() {
			return "StatsDaemon-" + daemonId;
		}

		void benchmarkOne() throws IOException {
			for (int idx = 0; idx < opsPerThread; idx++) {
				long stat = statsOp.executeOp(daemonId, idx, arg1);
				localNumOpsExecuted++;
				localCumulativeTime += stat;
			}
		}

		boolean isInProgress() {
			return localNumOpsExecuted < opsPerThread;
		}

		/**
		 * Schedule to stop this daemon.
		 */
		void terminate() {
			opsPerThread = localNumOpsExecuted;
		}
	}

	/**
	 * Clean all benchmark result directories.
	 */
	class CleanAllStats extends OperationStatsBase {
		// Operation types
		static final String OP_CLEAN_NAME = "clean";
		static final String OP_CLEAN_USAGE = "-op clean";

		CleanAllStats(List<String> args) {
			super();
			parseArguments(args);
			numOpsRequired = 1;
			numThreads = 1;
			keepResults = true;
		}

		String getOpName() {
			return OP_CLEAN_NAME;
		}

		void parseArguments(List<String> args) {
			boolean ignoreUnrelatedOptions = verifyOpArgument(args);
			if (args.size() > 2 && !ignoreUnrelatedOptions)
				printUsage();
		}

		void generateInputs(int[] opsPerThread) throws IOException {
			// do nothing
		}

		/**
		 * Does not require the argument
		 */
		String getExecutionArgument(int daemonId) {
			return null;
		}

		/**
		 * Remove entire benchmark directory.
		 */
		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
			long start = System.currentTimeMillis();
			nameNode.delete(BASE_DIR_NAME, true);
			long end = System.currentTimeMillis();
			return end - start;
		}

		void printResults() {
			LOG.info("--- " + getOpName() + " inputs ---");
			LOG.info("Remove directory " + BASE_DIR_NAME);

			res.println("--- " + getOpName() + " inputs ---");
			res.println("Remove directory " + BASE_DIR_NAME);
			printStats();
		}
	}

	/**
	 * File creation statistics.
	 *
	 * Each thread creates the same (+ or -1) number of files. File names are
	 * pre-generated during initialization. The created files do not have
	 * blocks.
	 */
	class CreateFileStats extends OperationStatsBase {
		// Operation types
		static final String OP_CREATE_NAME = "create";
		static final String OP_CREATE_USAGE = "-op create [-threads T] [-files N] [-filesPerDir P] [-close]";

		protected FileNameGenerator nameGenerator;
		protected String[][] fileNames;
		private boolean closeUponCreate;

		CreateFileStats(List<String> args) {
			super();
			parseArguments(args);
		}

		String getOpName() {
			return OP_CREATE_NAME;
		}

		void parseArguments(List<String> args) {
			boolean ignoreUnrelatedOptions = verifyOpArgument(args);
			int nrFilesPerDir = 4;
			closeUponCreate = false;
			for (int i = 2; i < args.size(); i++) { // parse command line
				if (args.get(i).equals("-files")) {
					if (i + 1 == args.size())
						printUsage();
					numOpsRequired = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-threads")) {
					if (i + 1 == args.size())
						printUsage();
					numThreads = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-filesPerDir")) {
					if (i + 1 == args.size())
						printUsage();
					nrFilesPerDir = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-close")) {
					closeUponCreate = true;
				} else if (!ignoreUnrelatedOptions)
					printUsage();
			}
			nameGenerator = new FileNameGenerator(getBaseDir(), nrFilesPerDir);
		}

		void generateInputs(int[] opsPerThread) throws IOException {
			assert opsPerThread.length == numThreads : "Error opsPerThread.length";
			nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
			// int generatedFileIdx = 0;
			LOG.info("Generate " + numOpsRequired + " intputs for "
					+ getOpName());
			fileNames = new String[numThreads][];
			for (int idx = 0; idx < numThreads; idx++) {
				int threadOps = opsPerThread[idx];
				fileNames[idx] = new String[threadOps];
				for (int jdx = 0; jdx < threadOps; jdx++)
					fileNames[idx][jdx] = nameGenerator
							.getNextFileName("ThroughputBench");
			}
		}

		void dummyActionNoSynch(int daemonId, int fileIdx) {
			for (int i = 0; i < 2000; i++)
				fileNames[daemonId][fileIdx].contains("" + i);
		}

		/**
		 * returns client name
		 */
		String getExecutionArgument(int daemonId) {
			return getClientName(daemonId);
		}

		/**
		 * Do file create.
		 */
		long executeOp(int daemonId, int inputIdx, String clientName)
				throws IOException {
			long start = System.currentTimeMillis();
			// dummyActionNoSynch(fileIdx);
			nameNode.create(fileNames[daemonId][inputIdx],
					FsPermission.getDefault(), clientName, true, replication,
					BLOCK_SIZE);
			long end = System.currentTimeMillis();
			for (boolean written = !closeUponCreate; !written; written = nameNode
					.complete(fileNames[daemonId][inputIdx], clientName))
				;
			return end - start;
		}

		void printResults() {
			LOG.info("--- " + getOpName() + " inputs ---");
			LOG.info("nrFiles = " + numOpsRequired);
			LOG.info("nrThreads = " + numThreads);
			LOG.info("nrFilesPerDir = " + nameGenerator.getFilesPerDirectory());

			res.println("--- " + getOpName() + " inputs ---");
			res.println("nrFiles = " + numOpsRequired);
			res.println("nrThreads = " + numThreads);
			res.println("nrFilesPerDir = "
					+ nameGenerator.getFilesPerDirectory());
			printStats();
		}
	}

	class CreateFilesWithBlocks extends CreateFileStats {

		private int blocksPerFile;
		private TinyDatanode[] datanodes;
		private int opsPerThread;

		private CreateFilesWithBlocks(List<String> args,
				TinyDatanode[] datanodes, int blocksPerFiles) {
			super(args);
			this.datanodes = datanodes;
			this.blocksPerFile = blocksPerFiles;
			// TODO Auto-generated constructor stub
		}

		void generateInputs(int[] opsPerThread) throws IOException {
			super.generateInputs(opsPerThread);
		}

		long executeOp(int daemonId, int inputIdx, String clientName)
				throws IOException {

			long start = System.currentTimeMillis();
			// dummyActionNoSynch(fileIdx);
			nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
			nameNode.create(fileNames[daemonId][inputIdx],
					FsPermission.getDefault(), clientName, true, replication,
					BLOCK_SIZE);
			addBlocks(fileNames[daemonId][inputIdx], clientName);
			datanodes[(daemonId * opsPerThread + inputIdx) % datanodes.length]
					.sendHeartbeat();
			nameNode.complete(fileNames[daemonId][inputIdx], clientName);
			long end = System.currentTimeMillis();
			return end - start;
		}

		// FileNameGenerator nameGenerator;
		// nameGenerator = new FileNameGenerator(getBaseDir(), 100);
		// String clientName = getClientName(007);
		// nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
		// for(int idx=0; idx < nrFiles; idx++) {
		// String fileName = nameGenerator.getNextFileName("ThroughputBench");
		// nameNode.create(fileName, FsPermission.getDefault(),
		// clientName, true, replication, BLOCK_SIZE);
		// addBlocks(fileName, clientName);
		// datanodes[idx % datanodes.length].sendHeartbeat(); // send the
		// heartBeat to namenode to notify this datanode's still alive
		// nameNode.complete(fileName, clientName);
		// }
		private void addBlocks(String fileName, String clientName)
				throws IOException {
			for (int jdx = 0; jdx < blocksPerFile; jdx++) {
				LocatedBlock loc = nameNode.addBlock(fileName, clientName);
				for (DatanodeInfo dnInfo : loc.getLocations()) {
					int dnIdx = Arrays
							.binarySearch(datanodes, dnInfo.getName());
					datanodes[dnIdx].addBlock(loc.getBlock());
					Block[] bi = new Block[] { loc.getBlock() };
					nameNode.blockReceivedAndDeleted(
							datanodes[dnIdx].dnRegistration, bi);
				}
			}
		}

	}

	/**
	 * Open file statistics.
	 *
	 * Measure how many open calls (getBlockLocations()) the name-node can
	 * handle per second.
	 */
	class OpenFileStats extends CreateFileStats {
		// Operation types
		static final String OP_OPEN_NAME = "open";
		static final String OP_USAGE_ARGS = " [-threads T] [-files N] [-filesPerDir P] [-useExisting]";
		static final String OP_OPEN_USAGE = "-op " + OP_OPEN_NAME
				+ OP_USAGE_ARGS;

		private boolean useExisting; // do not generate files, use existing ones

		OpenFileStats(List<String> args) {
			super(args);
		}

		String getOpName() {
			return OP_OPEN_NAME;
		}

		void parseArguments(List<String> args) {
			int ueIndex = args.indexOf("-useExisting");
			useExisting = (ueIndex >= 0);
			if (useExisting) {
				args.remove(ueIndex);
			}
			super.parseArguments(args);
		}

		void generateInputs(int[] opsPerThread) throws IOException {
			// create files using opsPerThread
			String[] createArgs = new String[] { "-op", "create", "-threads",
					String.valueOf(this.numThreads), "-files",
					String.valueOf(numOpsRequired), "-filesPerDir",
					String.valueOf(nameGenerator.getFilesPerDirectory()),
					"-close" };
			CreateFileStats opCreate = new CreateFileStats(
					Arrays.asList(createArgs));

			if (!useExisting) { // create files if they were not created before
				opCreate.benchmark();
				LOG.info("Created " + numOpsRequired + " files.");
			} else {
				LOG.info("useExisting = true. Assuming " + numOpsRequired
						+ " files have been created before.");
			}
			// use the same files for open
			super.generateInputs(opsPerThread);
			if (nameNode.getFileInfo(opCreate.getBaseDir()) != null
					&& nameNode.getFileInfo(getBaseDir()) == null) {
				nameNode.rename(opCreate.getBaseDir(), getBaseDir());
			}
			if (nameNode.getFileInfo(getBaseDir()) == null) {
				throw new IOException(getBaseDir() + " does not exist.");
			}
		}

		/**
		 * Do file open.
		 */
		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			long start = System.currentTimeMillis();
			nameNode.getBlockLocations(fileNames[daemonId][inputIdx], 0L,
					BLOCK_SIZE);
			long end = System.currentTimeMillis();
			return end - start;
		}
	}

	/**
	 * Delete file statistics.
	 *
	 * Measure how many delete calls the name-node can handle per second.
	 */
	class DeleteFileStats extends OpenFileStats {
		// Operation types
		static final String OP_DELETE_NAME = "delete";
		static final String OP_DELETE_USAGE = "-op " + OP_DELETE_NAME
				+ OP_USAGE_ARGS;

		DeleteFileStats(List<String> args) {
			super(args);
		}

		String getOpName() {
			return OP_DELETE_NAME;
		}

		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			long start = System.currentTimeMillis();
			nameNode.delete(fileNames[daemonId][inputIdx], false);
			long end = System.currentTimeMillis();
			return end - start;
		}
	}

	/**
	 * Rename file statistics.
	 *
	 * Measure how many rename calls the name-node can handle per second.
	 */
	class RenameFileStats extends OpenFileStats {
		// Operation types
		static final String OP_RENAME_NAME = "rename";
		static final String OP_RENAME_USAGE = "-op " + OP_RENAME_NAME
				+ OP_USAGE_ARGS;

		protected String[][] destNames;

		RenameFileStats(List<String> args) {
			super(args);
		}

		String getOpName() {
			return OP_RENAME_NAME;
		}

		void generateInputs(int[] opsPerThread) throws IOException {
			super.generateInputs(opsPerThread);
			destNames = new String[fileNames.length][];
			for (int idx = 0; idx < numThreads; idx++) {
				int nrNames = fileNames[idx].length;
				destNames[idx] = new String[nrNames];
				for (int jdx = 0; jdx < nrNames; jdx++)
					destNames[idx][jdx] = fileNames[idx][jdx] + ".r";
			}
		}

		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			long start = System.currentTimeMillis();
			nameNode.rename(fileNames[daemonId][inputIdx],
					destNames[daemonId][inputIdx]);
			long end = System.currentTimeMillis();
			return end - start;
		}
	}

	/**
	 * Minimal data-node simulator.
	 */
	private static class TinyDatanode implements Comparable<String> {
		private static final long DF_CAPACITY = 100 * 1024 * 1024;
		private static final long DF_USED = 0;

		NamespaceInfo nsInfo;
		DatanodeRegistration dnRegistration;
		Block[] blocks;
		int nrBlocks; // actual number of blocks

		/**
		 * Get data-node in the form <host name> : <port> where port is a 6
		 * digit integer. This is necessary in order to provide lexocographic
		 * ordering. Host names are all the same, the ordering goes by port
		 * numbers.
		 */
		private static String getNodeName(int port) throws IOException {
			String machineName = DNS.getDefaultHost("default", "default");
			String sPort = String.valueOf(100000 + port);
			if (sPort.length() > 6)
				throw new IOException("Too many data-nodes.");
			return machineName + ":" + sPort;
		}

		TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
			dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
			this.blocks = new Block[blockCapacity];
			this.nrBlocks = 0;
		}

		String getName() {
			return dnRegistration.getName();
		}

		void register() throws IOException {
			// get versions from the namenode
			nsInfo = nameNode.versionRequest();
      dnRegistration.setStorageInfo(new DataStorage(nsInfo, "", null), "");
			dnRegistration.storageID = DataNode.createNewStorageId(dnRegistration.getPort());
			// register datanode
			dnRegistration = nameNode.register(dnRegistration);
		}

		/**
		 * Send a heartbeat to the name-node. Ignore reply commands.
		 */
		void sendHeartbeat() throws IOException {
			// register datanode
			DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
					DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0);
			if (cmds != null) {
				for (DatanodeCommand cmd : cmds) {
					LOG.debug("sendHeartbeat Name-node reply: "
							+ cmd.getAction());
				}
			}
		}

		boolean addBlock(Block blk) {
			if (nrBlocks == blocks.length) {
				LOG.debug("Cannot add block: datanode capacity = "
						+ blocks.length);
				return false;
			}
			blocks[nrBlocks] = blk;
			nrBlocks++;
			return true;
		}

		void formBlockReport() {
			// fill remaining slots with blocks that do not exist
			for (int idx = blocks.length - 1; idx >= nrBlocks; idx--)
				blocks[idx] = new Block(blocks.length - idx, 0, 0);
		}

		public int compareTo(String name) {
			return getName().compareTo(name);
		}

		/**
		 * Send a heartbeat to the name-node and replicate blocks if requested.
		 */
		@SuppressWarnings("unused")
		int replicateBlocks() throws IOException {
			// register datanode
			DatanodeCommand[] cmds = nameNode.sendHeartbeat(dnRegistration,
					DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0, 0);
			if (cmds != null) {
				for (DatanodeCommand cmd : cmds) {
					if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
						// Send a copy of a block to another datanode
						BlockCommand bcmd = (BlockCommand) cmd;
						return transferBlocks(bcmd.getBlocks(),
								bcmd.getTargets());
					}
				}
			}
			return 0;
		}

		/**
		 * Transfer blocks to another data-node. Just report on behalf of the
		 * other data-node that the blocks have been received.
		 */
		private int transferBlocks(Block blocks[], DatanodeInfo xferTargets[][])
				throws IOException {
			for (int i = 0; i < blocks.length; i++) {
				DatanodeInfo blockTargets[] = xferTargets[i];
				for (int t = 0; t < blockTargets.length; t++) {
					DatanodeInfo dnInfo = blockTargets[t];
					DatanodeRegistration receivedDNReg;
					receivedDNReg = new DatanodeRegistration(dnInfo.getName());
					receivedDNReg.setStorageInfo(new DataStorage(nsInfo, dnInfo
							.getStorageID(), null), dnInfo.getStorageID());
					receivedDNReg.setInfoPort(dnInfo.getInfoPort());
					Block[] bi = new Block[] { blocks[i] };
					nameNode.blockReceivedAndDeleted(receivedDNReg, bi);
				}
			}
			// simulate block deletion form src
      Block[] blocksDeleted = new Block[blocks.length];
      for (int i=0; i<blocksDeleted.length; i++) {
        blocksDeleted[i] = new Block(blocks[i]);
        DFSUtil.markAsDeleted(blocksDeleted[i]);
      }
			nameNode.blockReceivedAndDeleted(dnRegistration, blocksDeleted);
			return blocks.length;
		}
	}

	/**
	 * Block report statistics.
	 *
	 * Each thread here represents its own data-node. Data-nodes send the same
	 * block report each time. The block report may contain missing or
	 * non-existing blocks.
	 */
	class BlockReportStats extends OperationStatsBase {
		static final String OP_BLOCK_REPORT_NAME = "blockReport";
		static final String OP_BLOCK_REPORT_USAGE = "-op blockReport [-datanodes T] [-reports N] "
				+ "[-blocksPerReport B] [-blocksPerFile F]";

		private int blocksPerReport;
		private int blocksPerFile;
		private TinyDatanode[] datanodes; // array of data-nodes sorted by name

		BlockReportStats(List<String> args) {
			super();
			this.blocksPerReport = 100;
			this.blocksPerFile = 10;
			// set heartbeat interval to 3 min, so that expiration were 40 min
			config.setLong("dfs.heartbeat.interval", 3 * 60);
			parseArguments(args);
			// adjust replication to the number of data-nodes
			this.replication = (short) Math.min((int) replication,
					getNumDatanodes());
		}

		/**
		 * Each thread pretends its a data-node here.
		 */
		private int getNumDatanodes() {
			return numThreads;
		}

		String getOpName() {
			return OP_BLOCK_REPORT_NAME;
		}

		void parseArguments(List<String> args) {
			boolean ignoreUnrelatedOptions = verifyOpArgument(args);
			for (int i = 2; i < args.size(); i++) { // parse command line
				if (args.get(i).equals("-reports")) {
					if (i + 1 == args.size())
						printUsage();
					numOpsRequired = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-datanodes")) {
					if (i + 1 == args.size())
						printUsage();
					numThreads = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-blocksPerReport")) {
					if (i + 1 == args.size())
						printUsage();
					blocksPerReport = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-blocksPerFile")) {
					if (i + 1 == args.size())
						printUsage();
					blocksPerFile = Integer.parseInt(args.get(++i));
				} else if (!ignoreUnrelatedOptions)
					printUsage();
			}
		}

		void generateInputs(int[] ignore) throws IOException {
			int nrDatanodes = getNumDatanodes();
			int nrBlocks = (int) Math.ceil((double) blocksPerReport
					* nrDatanodes / replication);
			int nrFiles = (int) Math.ceil((double) nrBlocks / blocksPerFile);
			datanodes = new TinyDatanode[nrDatanodes];
			// create data-nodes
			String prevDNName = "";
			for (int idx = 0; idx < nrDatanodes; idx++) {
				datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
				datanodes[idx].register();
				assert datanodes[idx].getName().compareTo(prevDNName) > 0 : "Data-nodes must be sorted lexicographically.";
				datanodes[idx].sendHeartbeat();
				prevDNName = datanodes[idx].getName();
			}

			// create files
			LOG.info("Creating " + nrFiles + " with " + blocksPerFile
					+ " blocks each.");

			// there are a hundred of threads to generate input
			String[] args = (new String[] { "-op", "create", "-threads",
					String.valueOf(400), "-files", String.valueOf(nrFiles),
					"-close" });

			CreateFilesWithBlocks creatfiles = new CreateFilesWithBlocks(
					Arrays.asList(args), datanodes, blocksPerFile);

			// generate files with multi threads
			creatfiles.benchmark();

			// FileNameGenerator nameGenerator;
			// nameGenerator = new FileNameGenerator(getBaseDir(), 100);
			// String clientName = getClientName(007);
			// nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
			// for(int idx=0; idx < nrFiles; idx++) {
			// String fileName =
			// nameGenerator.getNextFileName("ThroughputBench");
			// nameNode.create(fileName, FsPermission.getDefault(),
			// clientName, true, replication, BLOCK_SIZE);
			// addBlocks(fileName, clientName);
			// datanodes[idx % datanodes.length].sendHeartbeat(); // send the
			// heartBeat to namenode to notify this datanode's still alive
			// nameNode.complete(fileName, clientName);
			// }
			// prepare block reports
			for (int idx = 0; idx < nrDatanodes; idx++) {
				datanodes[idx].formBlockReport();
			}
		}

		private void addBlocks(String fileName, String clientName)
				throws IOException {
			for (int jdx = 0; jdx < blocksPerFile; jdx++) {
				LocatedBlock loc = nameNode.addBlock(fileName, clientName);
				for (DatanodeInfo dnInfo : loc.getLocations()) {
					int dnIdx = Arrays
							.binarySearch(datanodes, dnInfo.getName());
					datanodes[dnIdx].addBlock(loc.getBlock());
					Block[] bi = new Block[] { loc.getBlock() };
					nameNode.blockReceivedAndDeleted(
							datanodes[dnIdx].dnRegistration, bi);
				}
			}
		}

		/**
		 * Does not require the argument
		 */
		String getExecutionArgument(int daemonId) {
			return null;
		}

		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			assert daemonId < numThreads : "Wrong daemonId.";
			TinyDatanode dn = datanodes[daemonId];
			long start = System.currentTimeMillis();
			nameNode.blockReport(dn.dnRegistration,
					BlockListAsLongs.convertToArrayLongs(dn.blocks));
			long end = System.currentTimeMillis();
			return end - start;
		}

		void printResults() {
			String blockDistribution = "";
			String delim = "(";
			for (int idx = 0; idx < getNumDatanodes(); idx++) {
				blockDistribution += delim + datanodes[idx].nrBlocks;
				delim = ", ";
			}
			blockDistribution += ")";
			LOG.info("--- " + getOpName() + " inputs ---");
			LOG.info("reports = " + numOpsRequired);
			LOG.info("datanodes = " + numThreads + " " + blockDistribution);
			LOG.info("blocksPerReport = " + blocksPerReport);
			LOG.info("blocksPerFile = " + blocksPerFile);

			res.println("--- " + getOpName() + " inputs ---");
			res.println("reports = " + numOpsRequired);
			res.println("datanodes = " + numThreads + " " + blockDistribution);
			res.println("blocksPerReport = " + blocksPerReport);
			res.println("blocksPerFile = " + blocksPerFile);
			printStats();
		}
	} // end BlockReportStats

	/**
	 * Measures how fast replication monitor can compute data-node work.
	 *
	 * It runs only one thread until no more work can be scheduled.
	 */
	class ReplicationStats extends OperationStatsBase {
		static final String OP_REPLICATION_NAME = "replication";
		static final String OP_REPLICATION_USAGE = "-op replication [-datanodes T] [-nodesToDecommission D] "
				+ "[-nodeReplicationLimit C] [-totalBlocks B] [-replication R]";

		private BlockReportStats blockReportObject;
		private int numDatanodes;
		private int nodesToDecommission;
		private int nodeReplicationLimit;
		private int totalBlocks;
		private int numDecommissionedBlocks;
		private int numPendingBlocks;

		ReplicationStats(List<String> args) {
			super();
			numThreads = 1;
			numDatanodes = 3;
			nodesToDecommission = 1;
			nodeReplicationLimit = 100;
			totalBlocks = 100;
			parseArguments(args);
			// number of operations is 4 times the number of decommissioned
			// blocks divided by the number of needed replications scanned
			// by the replication monitor in one iteration
			numOpsRequired = (totalBlocks * replication * nodesToDecommission * 2)
					/ (numDatanodes * numDatanodes);

			String[] blkReportArgs = { "-op", "blockReport", "-datanodes",
					String.valueOf(numDatanodes), "-blocksPerReport",
					String.valueOf(totalBlocks * replication / numDatanodes),
					"-blocksPerFile", String.valueOf(numDatanodes) };
			blockReportObject = new BlockReportStats(
					Arrays.asList(blkReportArgs));
			numDecommissionedBlocks = 0;
			numPendingBlocks = 0;
		}

		String getOpName() {
			return OP_REPLICATION_NAME;
		}

		void parseArguments(List<String> args) {
			boolean ignoreUnrelatedOptions = verifyOpArgument(args);
			for (int i = 2; i < args.size(); i++) { // parse command line
				if (args.get(i).equals("-datanodes")) {
					if (i + 1 == args.size())
						printUsage();
					numDatanodes = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-nodesToDecommission")) {
					if (i + 1 == args.size())
						printUsage();
					nodesToDecommission = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-nodeReplicationLimit")) {
					if (i + 1 == args.size())
						printUsage();
					nodeReplicationLimit = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-totalBlocks")) {
					if (i + 1 == args.size())
						printUsage();
					totalBlocks = Integer.parseInt(args.get(++i));
				} else if (args.get(i).equals("-replication")) {
					if (i + 1 == args.size())
						printUsage();
					replication = Short.parseShort(args.get(++i));
				} else if (!ignoreUnrelatedOptions)
					printUsage();
			}
		}

		void generateInputs(int[] ignore) throws IOException {
			// start data-nodes; create a bunch of files; generate block
			// reports.
			blockReportObject.generateInputs(ignore);
			// stop replication monitor
			nameNode.namesystem.underreplthread.interrupt();
			nameNode.namesystem.overreplthread.interrupt();
			try {
				nameNode.namesystem.underreplthread.join();
				nameNode.namesystem.overreplthread.join();
			} catch (InterruptedException ei) {
				return;
			}
			// report blocks once
			int nrDatanodes = blockReportObject.getNumDatanodes();
			for (int idx = 0; idx < nrDatanodes; idx++) {
				blockReportObject.executeOp(idx, 0, null);
			}
			// decommission data-nodes
			decommissionNodes();
			// set node replication limit
			nameNode.namesystem.setNodeReplicationLimit(nodeReplicationLimit);
		}

		private void decommissionNodes() throws IOException {
			String excludeFN = config.get("dfs.hosts.exclude", "exclude");
			FileOutputStream excludeFile = new FileOutputStream(excludeFN);
			excludeFile.getChannel().truncate(0L);
			int nrDatanodes = blockReportObject.getNumDatanodes();
			numDecommissionedBlocks = 0;
			for (int i = 0; i < nodesToDecommission; i++) {
				TinyDatanode dn = blockReportObject.datanodes[nrDatanodes - 1
						- i];
				numDecommissionedBlocks += dn.nrBlocks;
				excludeFile.write(dn.getName().getBytes());
				excludeFile.write('\n');
				LOG.info("Datanode " + dn.getName() + " is decommissioned.");
			}
			excludeFile.close();
			nameNode.refreshNodes();
		}

		/**
		 * Does not require the argument
		 */
		String getExecutionArgument(int daemonId) {
			return null;
		}

		long executeOp(int daemonId, int inputIdx, String ignore)
				throws IOException {
			assert daemonId < numThreads : "Wrong daemonId.";
			long start = System.currentTimeMillis();
			// compute data-node work
			int work = nameNode.namesystem.computeDatanodeWork();
			long end = System.currentTimeMillis();
			numPendingBlocks += work;
			if (work == 0)
				daemons.get(daemonId).terminate();
			return end - start;
		}

		void printResults() {
			String blockDistribution = "";
			String delim = "(";
			int totalReplicas = 0;
			for (int idx = 0; idx < blockReportObject.getNumDatanodes(); idx++) {
				totalReplicas += blockReportObject.datanodes[idx].nrBlocks;
				blockDistribution += delim
						+ blockReportObject.datanodes[idx].nrBlocks;
				delim = ", ";
			}
			blockDistribution += ")";
			LOG.info("--- " + getOpName() + " inputs ---");
			LOG.info("numOpsRequired = " + numOpsRequired);
			LOG.info("datanodes = " + numDatanodes + " " + blockDistribution);
			LOG.info("decommissioned datanodes = " + nodesToDecommission);
			LOG.info("datanode replication limit = " + nodeReplicationLimit);
			LOG.info("total blocks = " + totalBlocks);

			res.println("--- " + getOpName() + " inputs ---");
			res.println("numOpsRequired = " + numOpsRequired);
			res.println("datanodes = " + numDatanodes + " " + blockDistribution);
			res.println("decommissioned datanodes = " + nodesToDecommission);
			res.println("datanode replication limit = " + nodeReplicationLimit);
			res.println("total blocks = " + totalBlocks);
			printStats();

			LOG.info("decommissioned blocks = " + numDecommissionedBlocks);
			LOG.info("pending replications = " + numPendingBlocks);
			LOG.info("replications per sec: " + getBlocksPerSecond());

			res.println("decommissioned blocks = " + numDecommissionedBlocks);
			res.println("pending replications = " + numPendingBlocks);
			res.println("replications per sec: " + getBlocksPerSecond());
		}

		private double getBlocksPerSecond() {
			return elapsedTime == 0 ? 0 : 1000 * (double) numPendingBlocks
					/ elapsedTime;
		}

	} // end ReplicationStats

	static void printUsage() {
		System.err.println("Usage: NNThroughputBenchmark" + "\n\t"
				+ OperationStatsBase.OP_ALL_USAGE + " | \n\t"
				+ CreateFileStats.OP_CREATE_USAGE + " | \n\t"
				+ OpenFileStats.OP_OPEN_USAGE + " | \n\t"
				+ DeleteFileStats.OP_DELETE_USAGE + " | \n\t"
				+ RenameFileStats.OP_RENAME_USAGE + " | \n\t"
				+ BlockReportStats.OP_BLOCK_REPORT_USAGE + " | \n\t"
				+ ReplicationStats.OP_REPLICATION_USAGE + " | \n\t"
				+ CleanAllStats.OP_CLEAN_USAGE);
		System.exit(-1);
	}

	/**
	 * Main method of the benchmark.
	 *
	 * @param args
	 *            command line parameters
	 */
	public static void runBenchmark(Configuration conf, List<String> args)
			throws Exception {
		if (args.size() < 2 || !args.get(0).startsWith("-op"))
			printUsage();

		String type = args.get(1);
		boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);

		NNThroughputBenchmark bench = null;
		List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
		OperationStatsBase opStat = null;
		try {
			bench = new NNThroughputBenchmark(conf);
			if (runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
				opStat = bench.new CreateFileStats(args);
				ops.add(opStat);
			}
			if (runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
				opStat = bench.new OpenFileStats(args);
				ops.add(opStat);
			}
			if (runAll || DeleteFileStats.OP_DELETE_NAME.equals(type)) {
				opStat = bench.new DeleteFileStats(args);
				ops.add(opStat);
			}
			if (runAll || RenameFileStats.OP_RENAME_NAME.equals(type)) {
				opStat = bench.new RenameFileStats(args);
				ops.add(opStat);
			}
			if (runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
				opStat = bench.new BlockReportStats(args);
				ops.add(opStat);
			}
			if (runAll || ReplicationStats.OP_REPLICATION_NAME.equals(type)) {
				opStat = bench.new ReplicationStats(args);
				ops.add(opStat);
			}
			if (runAll || CleanAllStats.OP_CLEAN_NAME.equals(type)) {
				opStat = bench.new CleanAllStats(args);
				ops.add(opStat);
			}
			if (ops.size() == 0)
				printUsage();
			// run each benchmark
			for (OperationStatsBase op : ops) {
				LOG.info("Starting benchmark: " + op.getOpName());
				op.benchmark();
				op.cleanUp();
			}
			// print statistics
			for (OperationStatsBase op : ops) {
				LOG.info("");
				op.printResults();
			}
		} catch (Exception e) {
			LOG.error(StringUtils.stringifyException(e));
			throw e;
		} finally {
			if (bench != null)
				bench.close();
		}
		// closing the file
		res.close();
	}

	public static void main(String[] args) throws Exception {
		runBenchmark(new Configuration(),
				new ArrayList<String>(Arrays.asList(args)));
	}
}
